Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request lays the groundwork for supporting Spark 4 as an execution engine for Apache Beam pipelines. It introduces a new runner built on Spark's Structured Streaming framework, initially focusing on batch processing capabilities. The changes encompass comprehensive updates to the build configuration, core pipeline component translations, and a robust metrics integration to ensure compatibility and observability within the Spark 4 ecosystem. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Ignored Files
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
| apply plugin: 'org.apache.beam.module' | ||
| applyJavaNature( | ||
| enableStrictDependencies: true, | ||
| requireJavaVersion: (spark_version.startsWith("4") ? org.gradle.api.JavaVersion.VERSION_17 : null), |
There was a problem hiding this comment.
Spark 4 requires Java Version 17
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces support for Spark 4.0.2 to the Apache Beam Spark runner, which includes a new structured streaming module, dependency updates for Java 17 and Scala 2.13, and various cross-version compatibility fixes. The code review identifies a critical bug where an incorrect function cast in the stateful runner would lead to a ClassCastException at runtime. Additionally, the feedback points out that the pipeline cancellation logic is incomplete because it does not interrupt the underlying execution future, and it identifies redundant code in the BoundedDatasetFactory that should be removed for clarity.
|
If the file If we prefer a cleaner implementation for Spark 4 at the cost of file duplication (or creating a version adapter), I can move the file and refactor it. |
|
assign set of reviewers |
|
Assigning reviewers: R: @Abacn for label build. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
The only failing test is |
|
It's now green. Let me know if I should split this further @Abacn |
Add spark4_version (4.0.2) to BeamModulePlugin alongside the existing
spark3_version. Update spark_runner.gradle to conditionally select the
correct Scala library (2.13 vs 2.12), Jackson module, Kafka test
dependency, and require Java 17 when building against Spark 4.
Register the new :runners:spark:4 module in settings.gradle.kts.
These changes are purely additive — all conditionals gate on
spark_version.startsWith("4") or spark_scala_version == '2.13', leaving
the Spark 3 build path untouched.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
Co-Authored-By: Claude Opus 4.6 <[email protected]>
Add the Gradle build file for the Spark 4 structured streaming runner. The module mirrors runners/spark/3/ — it inherits the shared RDD-base source from runners/spark/src/ via copySourceBase and adds its own Structured Streaming implementation in src/main/java. Key differences from the Spark 3 build: - Uses spark4_version (4.0.2) with Scala 2.13. - Excludes DStream-based streaming tests (Spark 4 supports only structured streaming batch). - Unconditionally adds --add-opens JVM flags required by Kryo on Java 17 (Spark 4's minimum). - Binds Spark driver to 127.0.0.1 for macOS compatibility. Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
Add the Spark 4 structured streaming runner implementation and tests. Most files are adapted from the Spark 3 structured streaming runner with targeted changes for Spark 4 / Scala 2.13 API compatibility. Key Spark 4-specific changes (diff against runners/spark/3/src/): EncoderFactory — Replaced the direct ExpressionEncoder constructor (removed in Spark 4) with BeamAgnosticEncoder, a named class implementing both AgnosticExpressionPathEncoder (for expression delegation via toCatalyst/fromCatalyst) and AgnosticEncoders .StructEncoder (so Dataset.select(TypedColumn) creates an N-attribute plan, preventing FIELD_NUMBER_MISMATCH). The toCatalyst/fromCatalyst methods substitute the provided input expression via transformUp, enabling correct nesting inside composite encoders like Encoders.tuple(). EncoderHelpers — Added toExpressionEncoder() helper to handle Spark 4 built-in encoders that are AgnosticEncoder subclasses rather than ExpressionEncoder. GroupByKeyTranslatorBatch — Migrated from internal catalyst Expression API (CreateNamedStruct, Literal$) to public Column API (struct(), lit(), array()), as required by Spark 4. BoundedDatasetFactory — Use classic.Dataset$.MODULE$.ofRows() as Dataset moved to org.apache.spark.sql.classic in Spark 4. ScalaInterop — Replace WrappedArray.ofRef (removed in Scala 2.13) with JavaConverters.asScalaBuffer().toList() in seqOf(). GroupByKeyHelpers, CombinePerKeyTranslatorBatch — Replace TraversableOnce with IterableOnce (Scala 2.13 rename). SparkStructuredStreamingPipelineResult — Replace sparkproject.guava with Beam's vendored Guava. Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
Add GitHub Actions workflows for the Spark 4 runner module: - beam_PreCommit_Java_Spark4_Versions: runs sparkVersionsTest on changes to runners/spark/**. Currently a no-op (the sparkVersions map is empty) but scaffolds future patch version coverage. - beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming: runs the structured streaming test suite on Java 17. Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
Remove endOfData() call in close method.
Add job-server and container build configurations for Spark 4, mirroring the existing Spark 3 job-server setup. The container uses eclipse-temurin:17 (Spark 4 requires Java 17). The shared spark_job_server.gradle gains a requireJavaVersion conditional for Spark 4 parent projects. Co-Authored-By: Claude Opus 4.6 <[email protected]>
The hostname binding hack is no longer needed now that the local machine resolves its hostname to 127.0.0.1 via /etc/hosts. Co-Authored-By: Claude Opus 4.6 <[email protected]>
Called out in /ultrareview as a missing contributor checklist item. Adds a Highlight line and a New Features / Improvements entry under the 2.74.0 Unreleased section, referencing issue apache#36841.
Per /ultrareview feedback: the one-line comment didn't make clear why the cast is safe. Expand it to note that SparkSession.builder() always returns a classic.SparkSession at runtime, which is why the downcast avoids reflection.
Per /ultrareview feedback: the fallback branch silently swallowed the second ClassNotFoundException. In practice one of the two classes is always present (Scala 2.12 vs 2.13 stdlib), but a silent skip could mask a broken classpath. Emit a LOG.warn instead.
Per /ultrareview feedback: the five `"$spark_version" >= "3.5.0"` checks were lexicographic string comparisons. They happened to work for 3.5.0 and 4.0.2 only because '4' > '3' as chars — a future "3.10.0" release would compare less than "3.5.0" and silently drop the Spark 3.5+ dependencies and exclusions. Introduce an `isSparkAtLeast` closure that tokenizes on `.` and `-`, keeps numeric parts, and compares component-by-component. Replace all five call sites.
|
Thanks, sorry for late response. Since this is a large change here are some generic comments
beam/runners/flink/flink_runner.gradle Line 73 in 621354f and other source override logics in the build.gradle Basically, the lowest version (3) is built on top of runners/spark/src/, the higher version (4) combines We can copy-paste the logic from runners/flink/flink_runner.gradle#L73 After infra work is done, we only need to copy the sources need changes to runners/spark/3/src, and for the review consider generate a diff between runners/spark/3/src and same files in runners/spark/src, like what we did for Flink 2 support: https://gist.github.com/Abacn/693c181134f839f04c4c97b42ecd2405 It's fine to have single or a few commits. 5+ commits won't further help. Alternatively we can split the support into multiple PRs, first check in spark-runner.gradle change and classic Spark runner, set applyJavaNature(publish=False) while work in progress. Then complete the support, and then job-server support |
…pt Flink-style version overrides Move runners/spark/3/src/.../structuredstreaming/ (the only sources Spark 3 shipped) into the shared runners/spark/src/, and replace the existing copySourceBase block in runners/spark/spark_runner.gradle with the per-version source-overrides layout used by runners/flink/flink_runner.gradle: the lowest spark_major builds straight from the shared base; higher majors get a Copy task with DuplicatesStrategy.INCLUDE that merges shared + previous majors + ./src so per-version files override. Pure refactor; Spark 3's compiled output is unchanged. Prepares the tree for the Spark 4 runner (apache#36841 / apache#38212), which lands as a small overrides layer on top.
|
Thank you for taking the time and guidance @Abacn! PR 1 (refactor only, no Spark 4): #38233
PR 2 (Spark 4, on top of PR 1): branch
How would you like to proceed? Options I see:
Happy to switch the head branch of this PR to |
…pt Flink-style version overrides Move runners/spark/3/src/.../structuredstreaming/ (the only sources Spark 3 shipped) into the shared runners/spark/src/, and replace the existing copySourceBase block in runners/spark/spark_runner.gradle with the per-version source-overrides layout used by runners/flink/flink_runner.gradle: the lowest spark_major builds straight from the shared base; higher majors get a Copy task with DuplicatesStrategy.INCLUDE that merges shared + previous majors + ./src so per-version files override. Pure refactor; Spark 3's compiled output is unchanged. Prepares the tree for the Spark 4 runner (apache#36841 / apache#38212), which lands as a small overrides layer on top.
…pt Flink-style version overrides Move runners/spark/3/src/.../structuredstreaming/ (the only sources Spark 3 shipped) into the shared runners/spark/src/, and replace the existing copySourceBase block in runners/spark/spark_runner.gradle with the per-version source-overrides layout used by runners/flink/flink_runner.gradle: the lowest spark_major builds straight from the shared base; higher majors get a Copy task with DuplicatesStrategy.INCLUDE that merges shared + previous majors + ./src so per-version files override. Pure refactor; Spark 3's compiled output is unchanged. Prepares the tree for the Spark 4 runner (apache#36841 / apache#38212), which lands as a small overrides layer on top.
Addresses #36841
This PR is split into smaller commits for easier review.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.